SpringCloud Stream消息驱动
参考资料
参考资料 干货|Spring Cloud Stream 体系及原理介绍 参考资料 SpringCloud 进阶: 消息驱动(入门)Spring Cloud Stream 参考资料 Spring Cloud 系列之 Spring Cloud Stream 参考资料 官方文档 参考资料 Spring Cloud 系列之 Spring Cloud Stream
SpringCloud Stream 是什么?
Spring Cloud Stream 就是用于整合消息中间件的框架,它可以用于集成 kafka 和 RabbitMQ,使之不用关注 MQ 的细节差异,直接使用
消息中间的几大应用场景
异步处理
比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。
应用解耦
假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。
流量削峰
比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。
日志处理
kafka 最开始就是专门为了处理日志产生的。
当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。
SpringCloud Stream 执行流程
Spring Cloud Stream 执行流程图
1、Middleware: 消息中间件,如 RabbitMq 等
2、Binder:可以认为是适配器,用来将 Stream 与中间连接起来的,不同的 Binder 对应不同的中间件,需要自己配置
3、Application Core:由 Stream 封装的消息机制,很少情况下自定义开发
4、inputs:输入,可以自定义开发
5、outputs:输出,可以自定义开发
再来认识一下 Spring Cloud Stream 中的几个重要概念。
Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder
,如果操作的是 RabbitMQ 就使用 rabbitmq binder
。就是靠它屏蔽了底层消息中间件的差异,降低切换成本,统一消息的编程模型
Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的 “生产者”和“消费者”(由目标绑定器创建)
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。(看下面的 Spring 的消息模型)
Stream 的消息模型
Spring Messaging 模块是 Spring Framework 中的一个模块,其作用就是 统一消息的编程模型。
消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header:
主要就是使用 @Input
和 @Output
注解
其中 @Input
注解的方法返回的是 SubscribableChannel
,@Output
注解的方法返回的是 MessageChannel
。
package org.springframework.messaging;
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中 :
@FunctionalInterface
public interface MessageChannel {
// 默认不超时
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
boolean send(Message<?> message, long timeout);
}
消息通道里的消息如何被消费呢?
消息通道的子接口可通过订阅消息通道 SubscribableChannel
实现,被 MessageHandler 消息处理器所订阅:
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
所以发送消息时如下
@Autowired
private StreamClient streamClient;
@GetMapping("/send")
public String send() {
// output() 的返回值就是 MessageChannel
// 这里使用了自带的建造者工具(必须使用)
streamClient.output().send(MessageBuilder.withPayload("this is rabbitmq send message!!").build());
return "消息发送成功";
}
配置环境
引入依赖,这里的中间件使用 RabbitMq
两个模块:消息生产者(server-receiver)、消息生产者(server-sender)都需要引入该依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
生产者-sender
编写一个用来传输数据的通道
public interface SenderSource {
// 设置输出的通道标识
String OUTPUT = "myMessage";
@Output(SenderSource.OUTPUT)
MessageChannel output();
}
启动类
/**
* @EnableBinding(SenderSource.class) 表示监听 Stream 通道功能
* SenderSource 为自定义的通道接口
*/
@SpringBootApplication
@EnableBinding(SenderSource.class) // 绑定通道(channel)
public class SenderApplication {
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
}
发送消息
编写一个 Controller 用于发送消息
@RestController
public class SendMsgController {
@Autowired
private SenderSource senderSource;
@GetMapping("/send")
public String sendMsg() {
senderSource.output().send(MessageBuilder.withPayload("this is rabbitmq send message!!").build());
return "发送成功";
}
}
配置文件
spring:
cloud:
stream:
bindings:
myMessage:
# 每个通道下的 destination 属性指 exchange 的名称
destination: mytopic
binder: defaultRabbit # 对应下面的 defaultRabbit
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /test
server:
port: 8081
bindings 配置
- input:表示
channelName
,这里为什么是 myMessage,是因为启动类中@EnableBinding(SenderSource.class)
注解当中配置 SenderSource 接口,该接口中默认定义了 channelName 的名称 - destination:每个通道下的 destination 属性指 exchange 的名称(注意:input、output 双方的交换机要相同)
- binder:当前 bindings 绑定的对应的适配器,该实例表示适配 rabbitmq,名称默认为 defaultRabbit,可以自定义,接着需要配置该名称对应的类型,环境信息等
binders 配置
- defaultRabbit:binder 配置的适配器的名称,和
spring.cloud.stream.bindings.myMessage.binder
值一样 - environment:表示当前 binder 对应的配置信息
消费者-receiver
public interface StreamClient {
// 设置输出的通道标识
String INPUT = "myMessage";
@Input(StreamClient.INPUT)
MessageChannel output();
}
启动类
/**
* @EnableBinding 表示告诉当前应用,增加消息通道的监听功能
* 监听 StreamClient 类中名为 myMessage 的输入通道:
*/
@SpringBootApplication
@EnableBinding(StreamClient.class)
public class ReceiverApplication {
public static void main(String[] args) {
SpringApplication.run(ReceiverApplication.class, args);
}
}
监听服务
@Slf4j
@Component
public class DefaultMessageListener {
@StreamListener(StreamClient.INPUT)
public void processMyMessage(String message) {
log.info("接收到消息:" + message);
}
}
编写配置文件
spring:
cloud:
stream:
bindings:
myMessage:
destination: mytopic # 指定交换机的名称
binder: local_rabbit # 对应下面的 local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
server:
port: 8080
search 服务的例子
编写消费者
让 search 服务来当消费者
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
在 search 配置文件上连接 RabbitMQ
# 随便指定个名字
spring:
application:
name: search
# 连接 RabbitMQ
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /test
server:
port: 8083
# 如果想要配置多个中间件可以如下这样
spring:
cloud:
stream:
bindings:
myMessage:
destination: mytopic # 指定交换机的名称
binder: local_rabbit # 对应下面的 local_rabbit
group: Customer # 例如指定这个为消费者组
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
创建一个接口用来接收消息
package com.alsritter.stream;
public interface StreamClient {
@Input("myMessage")
SubscribableChannel input();
}
创建一个 StreamReceiver 用来监听消息
这个 @EnableBinding
可以加在启动类,也可以加在这里,反正能扫描到就行了
package com.alsritter.listener;
@Component
@EnableBinding(StreamClient.class) // 代理这个 StreamClient
public class StreamReceiver {
@StreamListener("myMessage")
public void msg(Object msg) {
System.out.println("接收到的消息为" + msg);
}
}
编写生产者
让 Customer 服务来当消费者 同样先引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配置文件一样连接 RabbitMQ
version: v1
spring:
application:
name: Customer-${version}
# 连接 RabbitMQ
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /test
创建一个接口用来发送消息
package com.alsritter.stream;
public interface StreamClient {
@Output("myMessage")
SubscribableChannel output();
}
可以在 Controller 里发送消息
@Autowired
private StreamClient streamClient;
@GetMapping("/send")
public String send() {
streamClient.output().send(MessageBuilder.withPayload("this is rabbitMQ send message!!").build());
return "消息发送成功";
}
再在启动类里加上 @EnableBinding
注解绑定到这个 StreamClient
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(StreamClient.class)
public class EurekaClientApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaClientApplication.class, args);
}
}
然后访问这个 uri 就能发送消息给 search 服务了
http://localhost:8085/v1/customer/send
重复消费
默认中间件是和 Binder 绑定的,其会使添加服务一个监听就增多一个队列,这样会导致消息被重复消费(就像广播一样),所以需要给这些消费者都配置为同一个组,避免消息被重复消费
spring:
application:
name: Customer-${version}
# 连接 RabbitMQ
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /test
cloud:
stream:
bindings:
myMessage:
group: Customer # 例如指定这个为消费者组
手动 ACK
设置应答模式为 MANUAL
# 随便指定个名字
spring:
application:
name: search
# 连接 RabbitMQ
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /test
cloud:
stream:
rabbit:
bindings:
myMessage:
consumer:
acknowledgeMode: MANUAL # 手动确认 ACK
修改一下消费者的监听器
@Component
@EnableBinding(StreamClient.class) // 先绑定刚才创建的 StreamClient,使消息能传递过去
public class StreamReceiver {
@StreamListener("myMessage")
public void msg(Object msg,
@Header(name = AmqpHeaders.CHANNEL) Channel channel,
@Header(name = AmqpHeaders.DELIVERY_TAG) long deliveryTag
) throws IOException {
// 手动 ACK需要 channel.basicAck(deliveryTag, true);
// //第二个参数:手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(deliveryTag, false);
System.out.println("接收到的消息为" + msg);
}
}